import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
import warnings
warnings.filterwarnings('ignore')
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from pyspark.conf import SparkConf
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import s3fs
from io import StringIO
print("✅ Librerías importadas correctamente")
✅ Librerías importadas correctamente
# Configuración de visualización
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
pd.set_option('display.max_columns', None)
warnings.filterwarnings('ignore')
print("✅ Librerías importadas exitosamente - incluyendo AWS S3 y Spark")
✅ Librerías importadas exitosamente - incluyendo AWS S3 y Spark
spark = (
SparkSession.builder
.appName("COVID19-México")
.config("spark.jars.packages",
"org.apache.hadoop:hadoop-aws:3.3.2,com.amazonaws:aws-java-sdk-bundle:1.12.367")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
.config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")
.getOrCreate()
)
df = spark.read.parquet("s3a://xideralaws-curso-edgargu/covid19-data/covid19_mexico_s3fs.parquet")
df.printSchema()
df.show(3, truncate=False)
root |-- FECHA_SINTOMAS: string (nullable = true) |-- FECHA_INGRESO: string (nullable = true) |-- FECHA_DEF: string (nullable = true) |-- SEXO: long (nullable = true) |-- EDAD: long (nullable = true) |-- TIPO_PACIENTE: long (nullable = true) |-- INTUBADO: long (nullable = true) |-- NEUMONIA: long (nullable = true) |-- DIABETES: long (nullable = true) |-- HIPERTENSION: long (nullable = true) |-- OBESIDAD: long (nullable = true) |-- ASMA: long (nullable = true) |-- EPOC: long (nullable = true) |-- ENTIDAD_RES: long (nullable = true) |-- CLASIFICACION_FINAL_COVID: long (nullable = true) +--------------+-------------+----------+----+----+-------------+--------+--------+--------+------------+--------+----+----+-----------+-------------------------+ |FECHA_SINTOMAS|FECHA_INGRESO|FECHA_DEF |SEXO|EDAD|TIPO_PACIENTE|INTUBADO|NEUMONIA|DIABETES|HIPERTENSION|OBESIDAD|ASMA|EPOC|ENTIDAD_RES|CLASIFICACION_FINAL_COVID| +--------------+-------------+----------+----+----+-------------+--------+--------+--------+------------+--------+----+----+-----------+-------------------------+ |2025-04-20 |2025-04-21 |9999-99-99|2 |8 |1 |97 |2 |2 |2 |2 |2 |2 |1 |6 | |2025-01-01 |2025-01-02 |9999-99-99|2 |23 |1 |97 |2 |2 |2 |2 |2 |2 |20 |6 | |2025-01-02 |2025-01-02 |9999-99-99|2 |18 |1 |97 |2 |2 |2 |2 |1 |2 |8 |6 | +--------------+-------------+----------+----+----+-------------+--------+--------+--------+------------+--------+----+----+-----------+-------------------------+ only showing top 3 rows
s3_client = boto3.client("s3")
bucket_name = 'xideralaws-curso-edgargu'
object_key = 'covid19-data'
print("Configuración S3 completada")
print(f"🔗 Configuración S3 completada")
print(f" Bucket: {bucket_name}")
print(f" Archivo: {object_key}")
Configuración S3 completada 🔗 Configuración S3 completada Bucket: xideralaws-curso-edgargu Archivo: covid19-data
# Cargar datos desde S3
s3_path = f's3://{bucket_name}/{object_key}/COVID19MEXICO.csv'
try:
# Intentar cargar desde S3
df = pd.read_csv(s3_path)
print(f"✅ Datos cargados desde S3: {df.shape}")
except:
# Cargar desde archivo local
df = pd.read_csv('COVID19MEXICO.csv')
print(f"✅ Datos cargados localmente: {df.shape}")
print(f"Dataset shape: {df.shape[0]:,} filas x {df.shape[1]} columnas")
✅ Datos cargados localmente: (104583, 42) Dataset shape: 104,583 filas x 42 columnas
print("="*80)
print("INFORMACIÓN GENERAL DEL DATASET")
print("="*80)
print(f"\nShape del dataset: {df.shape}")
print(f"Columnas: {list(df.columns)}")
print("\nInformación básica:")
print(df.info())
print("\nPrimeras 5 filas:")
print(df.head())
print("\nEstadísticas descriptivas:")
print(df.describe())
================================================================================
INFORMACIÓN GENERAL DEL DATASET
================================================================================
Shape del dataset: (104583, 42)
Columnas: ['FECHA_ACTUALIZACION', 'ID_REGISTRO', 'ORIGEN', 'SECTOR', 'ENTIDAD_UM', 'SEXO', 'ENTIDAD_NAC', 'ENTIDAD_RES', 'MUNICIPIO_RES', 'TIPO_PACIENTE', 'FECHA_INGRESO', 'FECHA_SINTOMAS', 'FECHA_DEF', 'INTUBADO', 'NEUMONIA', 'EDAD', 'NACIONALIDAD', 'EMBARAZO', 'HABLA_LENGUA_INDIG', 'INDIGENA', 'DIABETES', 'EPOC', 'ASMA', 'INMUSUPR', 'HIPERTENSION', 'OTRA_COM', 'CARDIOVASCULAR', 'OBESIDAD', 'RENAL_CRONICA', 'TABAQUISMO', 'OTRO_CASO', 'TOMA_MUESTRA_LAB', 'RESULTADO_PCR', 'RESULTADO_PCR_COINFECCION', 'TOMA_MUESTRA_ANTIGENO', 'RESULTADO_ANTIGENO', 'CLASIFICACION_FINAL_COVID', 'CLASIFICACION_FINAL_FLU', 'MIGRANTE', 'PAIS_NACIONALIDAD', 'PAIS_ORIGEN', 'UCI']
Información básica:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 104583 entries, 0 to 104582
Data columns (total 42 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 FECHA_ACTUALIZACION 104583 non-null object
1 ID_REGISTRO 104583 non-null object
2 ORIGEN 104583 non-null int64
3 SECTOR 104583 non-null int64
4 ENTIDAD_UM 104583 non-null int64
5 SEXO 104583 non-null int64
6 ENTIDAD_NAC 104583 non-null int64
7 ENTIDAD_RES 104583 non-null int64
8 MUNICIPIO_RES 104583 non-null int64
9 TIPO_PACIENTE 104583 non-null int64
10 FECHA_INGRESO 104583 non-null object
11 FECHA_SINTOMAS 104583 non-null object
12 FECHA_DEF 104583 non-null object
13 INTUBADO 104583 non-null int64
14 NEUMONIA 104583 non-null int64
15 EDAD 104583 non-null int64
16 NACIONALIDAD 104583 non-null int64
17 EMBARAZO 104583 non-null int64
18 HABLA_LENGUA_INDIG 104583 non-null int64
19 INDIGENA 104583 non-null int64
20 DIABETES 104583 non-null int64
21 EPOC 104583 non-null int64
22 ASMA 104583 non-null int64
23 INMUSUPR 104583 non-null int64
24 HIPERTENSION 104583 non-null int64
25 OTRA_COM 104583 non-null int64
26 CARDIOVASCULAR 104583 non-null int64
27 OBESIDAD 104583 non-null int64
28 RENAL_CRONICA 104583 non-null int64
29 TABAQUISMO 104583 non-null int64
30 OTRO_CASO 104583 non-null int64
31 TOMA_MUESTRA_LAB 104583 non-null int64
32 RESULTADO_PCR 104583 non-null int64
33 RESULTADO_PCR_COINFECCION 104583 non-null int64
34 TOMA_MUESTRA_ANTIGENO 104583 non-null int64
35 RESULTADO_ANTIGENO 104583 non-null int64
36 CLASIFICACION_FINAL_COVID 104583 non-null int64
37 CLASIFICACION_FINAL_FLU 104583 non-null int64
38 MIGRANTE 104583 non-null int64
39 PAIS_NACIONALIDAD 104583 non-null object
40 PAIS_ORIGEN 104583 non-null object
41 UCI 104583 non-null int64
dtypes: int64(35), object(7)
memory usage: 33.5+ MB
None
Primeras 5 filas:
FECHA_ACTUALIZACION ID_REGISTRO ORIGEN SECTOR ENTIDAD_UM SEXO \
0 2025-08-19 167f1a 1 12 1 2
1 2025-08-19 ga8a474 1 4 20 2
2 2025-08-19 gb733da 1 6 8 2
3 2025-08-19 g9ff7b3 1 4 32 1
4 2025-08-19 g90b5c8 1 6 10 1
ENTIDAD_NAC ENTIDAD_RES MUNICIPIO_RES TIPO_PACIENTE FECHA_INGRESO \
0 1 1 3 1 2025-04-21
1 20 20 413 1 2025-01-02
2 8 8 37 1 2025-01-02
3 32 32 17 2 2025-01-01
4 10 10 5 2 2025-01-02
FECHA_SINTOMAS FECHA_DEF INTUBADO NEUMONIA EDAD NACIONALIDAD \
0 2025-04-20 9999-99-99 97 2 8 1
1 2025-01-01 9999-99-99 97 2 23 1
2 2025-01-02 9999-99-99 97 2 18 1
3 2025-01-01 9999-99-99 2 2 24 1
4 2025-01-02 9999-99-99 2 2 47 1
EMBARAZO HABLA_LENGUA_INDIG INDIGENA DIABETES EPOC ASMA INMUSUPR \
0 97 2 2 2 2 2 2
1 97 2 2 2 2 2 2
2 97 2 2 2 2 1 2
3 2 2 2 2 2 1 2
4 2 2 2 2 2 2 2
HIPERTENSION OTRA_COM CARDIOVASCULAR OBESIDAD RENAL_CRONICA \
0 2 2 2 2 2
1 2 2 2 2 2
2 2 2 2 2 2
3 2 2 2 2 2
4 2 2 2 2 2
TABAQUISMO OTRO_CASO TOMA_MUESTRA_LAB RESULTADO_PCR \
0 2 2 2 997
1 2 2 2 997
2 2 2 2 997
3 2 2 1 5
4 2 2 1 5
RESULTADO_PCR_COINFECCION TOMA_MUESTRA_ANTIGENO RESULTADO_ANTIGENO \
0 997 2 97
1 997 2 97
2 997 2 97
3 5 2 97
4 5 2 97
CLASIFICACION_FINAL_COVID CLASIFICACION_FINAL_FLU MIGRANTE \
0 6 6 99
1 6 6 99
2 6 6 99
3 7 7 99
4 7 7 99
PAIS_NACIONALIDAD PAIS_ORIGEN UCI
0 México 97 97
1 México 97 97
2 México 97 97
3 México 97 2
4 México 97 2
Estadísticas descriptivas:
ORIGEN SECTOR ENTIDAD_UM SEXO ENTIDAD_NAC \
count 104583.0 104583.000000 104583.000000 104583.000000 104583.000000
mean 1.0 8.757054 15.935735 1.443323 16.654887
std 0.0 4.636364 8.481913 0.496780 10.061865
min 1.0 2.000000 1.000000 1.000000 1.000000
25% 1.0 4.000000 9.000000 1.000000 9.000000
50% 1.0 6.000000 15.000000 1.000000 15.000000
75% 1.0 12.000000 21.000000 2.000000 22.000000
max 1.0 15.000000 32.000000 2.000000 99.000000
ENTIDAD_RES MUNICIPIO_RES TIPO_PACIENTE INTUBADO \
count 104583.000000 104583.000000 104583.000000 104583.000000
mean 16.182114 44.680158 1.460926 53.199268
std 8.397489 58.969891 0.498473 47.378509
min 1.000000 1.000000 1.000000 1.000000
25% 9.000000 8.000000 1.000000 2.000000
50% 15.000000 20.000000 1.000000 97.000000
75% 22.000000 57.000000 2.000000 97.000000
max 32.000000 999.000000 2.000000 99.000000
NEUMONIA EDAD NACIONALIDAD EMBARAZO \
count 104583.000000 104583.000000 104583.000000 104583.000000
mean 1.762026 36.606982 1.004542 44.276546
std 0.425845 25.900712 0.067240 47.226776
min 1.000000 0.000000 1.000000 1.000000
25% 2.000000 13.000000 1.000000 2.000000
50% 2.000000 34.000000 1.000000 2.000000
75% 2.000000 56.000000 1.000000 97.000000
max 2.000000 111.000000 2.000000 98.000000
HABLA_LENGUA_INDIG INDIGENA DIABETES EPOC \
count 104583.000000 104583.000000 104583.000000 104583.000000
mean 5.100800 4.734144 1.990237 2.095388
std 17.097612 16.105444 3.468442 3.528481
min 1.000000 1.000000 1.000000 1.000000
25% 2.000000 2.000000 2.000000 2.000000
50% 2.000000 2.000000 2.000000 2.000000
75% 2.000000 2.000000 2.000000 2.000000
max 99.000000 99.000000 98.000000 98.000000
ASMA INMUSUPR HIPERTENSION OTRA_COM \
count 104583.000000 104583.000000 104583.000000 104583.000000
mean 2.092864 2.055583 1.923534 2.712793
std 3.566553 2.913621 2.889871 8.546264
min 1.000000 1.000000 1.000000 1.000000
25% 2.000000 2.000000 2.000000 2.000000
50% 2.000000 2.000000 2.000000 2.000000
75% 2.000000 2.000000 2.000000 2.000000
max 98.000000 98.000000 98.000000 98.000000
CARDIOVASCULAR OBESIDAD RENAL_CRONICA TABAQUISMO \
count 104583.000000 104583.000000 104583.000000 104583.000000
mean 2.060689 2.008309 2.050821 2.064590
std 3.017742 2.716976 2.883812 3.258229
min 1.000000 1.000000 1.000000 1.000000
25% 2.000000 2.000000 2.000000 2.000000
50% 2.000000 2.000000 2.000000 2.000000
75% 2.000000 2.000000 2.000000 2.000000
max 98.000000 98.000000 98.000000 98.000000
OTRO_CASO TOMA_MUESTRA_LAB RESULTADO_PCR \
count 104583.00000 104583.000000 104583.000000
mean 4.37552 1.334815 353.393037
std 15.58371 0.471928 471.209645
min 1.00000 1.000000 1.000000
25% 2.00000 1.000000 5.000000
50% 2.00000 1.000000 5.000000
75% 2.00000 2.000000 997.000000
max 99.00000 2.000000 999.000000
RESULTADO_PCR_COINFECCION TOMA_MUESTRA_ANTIGENO RESULTADO_ANTIGENO \
count 104583.000000 104583.0 104583.0
mean 547.494277 2.0 97.0
std 493.693981 0.0 0.0
min 1.000000 2.0 97.0
25% 5.000000 2.0 97.0
50% 997.000000 2.0 97.0
75% 997.000000 2.0 97.0
max 999.000000 2.0 97.0
CLASIFICACION_FINAL_COVID CLASIFICACION_FINAL_FLU MIGRANTE \
count 104583.000000 104583.000000 104583.000000
mean 6.381783 6.187067 98.609439
std 0.982804 1.210828 6.150734
min 3.000000 3.000000 1.000000
25% 6.000000 6.000000 99.000000
50% 7.000000 7.000000 99.000000
75% 7.000000 7.000000 99.000000
max 7.000000 7.000000 99.000000
UCI
count 104583.000000
mean 53.206707
std 47.370390
min 1.000000
25% 2.000000
50% 97.000000
75% 97.000000
max 99.000000
# Crear Spark DataFrame
spark_df = spark.createDataFrame(df)
print("DataFrame de Spark creado")
print(f"Número de particiones: {spark_df.rdd.getNumPartitions()}")
# Mostrar schema
spark_df.printSchema()
# Cache del DataFrame
spark_df.cache()
print("DataFrame cacheado para mejor rendimiento")
DataFrame de Spark creado Número de particiones: 2 root |-- FECHA_ACTUALIZACION: string (nullable = true) |-- ID_REGISTRO: string (nullable = true) |-- ORIGEN: long (nullable = true) |-- SECTOR: long (nullable = true) |-- ENTIDAD_UM: long (nullable = true) |-- SEXO: long (nullable = true) |-- ENTIDAD_NAC: long (nullable = true) |-- ENTIDAD_RES: long (nullable = true) |-- MUNICIPIO_RES: long (nullable = true) |-- TIPO_PACIENTE: long (nullable = true) |-- FECHA_INGRESO: string (nullable = true) |-- FECHA_SINTOMAS: string (nullable = true) |-- FECHA_DEF: string (nullable = true) |-- INTUBADO: long (nullable = true) |-- NEUMONIA: long (nullable = true) |-- EDAD: long (nullable = true) |-- NACIONALIDAD: long (nullable = true) |-- EMBARAZO: long (nullable = true) |-- HABLA_LENGUA_INDIG: long (nullable = true) |-- INDIGENA: long (nullable = true) |-- DIABETES: long (nullable = true) |-- EPOC: long (nullable = true) |-- ASMA: long (nullable = true) |-- INMUSUPR: long (nullable = true) |-- HIPERTENSION: long (nullable = true) |-- OTRA_COM: long (nullable = true) |-- CARDIOVASCULAR: long (nullable = true) |-- OBESIDAD: long (nullable = true) |-- RENAL_CRONICA: long (nullable = true) |-- TABAQUISMO: long (nullable = true) |-- OTRO_CASO: long (nullable = true) |-- TOMA_MUESTRA_LAB: long (nullable = true) |-- RESULTADO_PCR: long (nullable = true) |-- RESULTADO_PCR_COINFECCION: long (nullable = true) |-- TOMA_MUESTRA_ANTIGENO: long (nullable = true) |-- RESULTADO_ANTIGENO: long (nullable = true) |-- CLASIFICACION_FINAL_COVID: long (nullable = true) |-- CLASIFICACION_FINAL_FLU: long (nullable = true) |-- MIGRANTE: long (nullable = true) |-- PAIS_NACIONALIDAD: string (nullable = true) |-- PAIS_ORIGEN: string (nullable = true) |-- UCI: long (nullable = true) DataFrame cacheado para mejor rendimiento
25/08/29 05:24:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
print("="*60)
print("ANÁLISIS CON SPARK")
print("="*60)
# Contar total de registros
total_registros = spark_df.count()
print(f"Total de registros: {total_registros:,}")
# Análisis por sexo
print("\nDistribución por SEXO:")
spark_df.groupBy("SEXO").count().orderBy(desc("count")).show()
# Análisis por tipo de paciente
print("\nDistribución por TIPO_PACIENTE:")
spark_df.groupBy("TIPO_PACIENTE").count().orderBy(desc("count")).show()
# Estadísticas de edad
print("\nEstadísticas de EDAD:")
spark_df.select("EDAD").describe().show()
============================================================ ANÁLISIS CON SPARK ============================================================
25/08/29 05:24:35 WARN TaskSetManager: Stage 6 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
25/08/29 05:24:40 WARN TaskSetManager: Stage 7 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Total de registros: 104,583 Distribución por SEXO:
25/08/29 05:24:41 WARN TaskSetManager: Stage 10 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
+----+-----+ |SEXO|count| +----+-----+ | 1|58219| | 2|46364| +----+-----+ Distribución por TIPO_PACIENTE:
25/08/29 05:24:42 WARN TaskSetManager: Stage 13 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
+-------------+-----+ |TIPO_PACIENTE|count| +-------------+-----+ | 1|56378| | 2|48205| +-------------+-----+ Estadísticas de EDAD:
25/08/29 05:24:42 WARN TaskSetManager: Stage 16 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
+-------+------------------+ |summary| EDAD| +-------+------------------+ | count| 104583| | mean| 36.60698201428531| | stddev|25.900712029021644| | min| 0| | max| 111| +-------+------------------+
print("="*60)
print("LIMPIEZA Y TRANSFORMACIÓN DE DATOS")
print("="*60)
# Verificar valores nulos
print("Valores nulos por columna:")
from pyspark.sql.functions import col, isnan, when, count
null_counts = spark_df.select([count(when(col(c).isNull(), c)).alias(c) for c in spark_df.columns])
null_counts.show()
# Limpiar datos
df_clean = spark_df.filter(
(col("EDAD").isNotNull()) &
(col("SEXO").isin([1, 2])) &
(col("TIPO_PACIENTE").isin([1, 2]))
)
print(f"Registros después de limpieza: {df_clean.count():,}")
# Crear categorías de edad
df_clean = df_clean.withColumn("GRUPO_EDAD",
when(col("EDAD") < 18, "0-17")
.when(col("EDAD") < 30, "18-29")
.when(col("EDAD") < 45, "30-44")
.when(col("EDAD") < 60, "45-59")
.when(col("EDAD") < 75, "60-74")
.otherwise("75+")
)
print("Categorías de edad creadas")
============================================================ LIMPIEZA Y TRANSFORMACIÓN DE DATOS ============================================================ Valores nulos por columna:
25/08/29 05:24:43 WARN TaskSetManager: Stage 19 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
+-------------------+-----------+------+------+----------+----+-----------+-----------+-------------+-------------+-------------+--------------+---------+--------+--------+----+------------+--------+------------------+--------+--------+----+----+--------+------------+--------+--------------+--------+-------------+----------+---------+----------------+-------------+-------------------------+---------------------+------------------+-------------------------+-----------------------+--------+-----------------+-----------+---+ |FECHA_ACTUALIZACION|ID_REGISTRO|ORIGEN|SECTOR|ENTIDAD_UM|SEXO|ENTIDAD_NAC|ENTIDAD_RES|MUNICIPIO_RES|TIPO_PACIENTE|FECHA_INGRESO|FECHA_SINTOMAS|FECHA_DEF|INTUBADO|NEUMONIA|EDAD|NACIONALIDAD|EMBARAZO|HABLA_LENGUA_INDIG|INDIGENA|DIABETES|EPOC|ASMA|INMUSUPR|HIPERTENSION|OTRA_COM|CARDIOVASCULAR|OBESIDAD|RENAL_CRONICA|TABAQUISMO|OTRO_CASO|TOMA_MUESTRA_LAB|RESULTADO_PCR|RESULTADO_PCR_COINFECCION|TOMA_MUESTRA_ANTIGENO|RESULTADO_ANTIGENO|CLASIFICACION_FINAL_COVID|CLASIFICACION_FINAL_FLU|MIGRANTE|PAIS_NACIONALIDAD|PAIS_ORIGEN|UCI| +-------------------+-----------+------+------+----------+----+-----------+-----------+-------------+-------------+-------------+--------------+---------+--------+--------+----+------------+--------+------------------+--------+--------+----+----+--------+------------+--------+--------------+--------+-------------+----------+---------+----------------+-------------+-------------------------+---------------------+------------------+-------------------------+-----------------------+--------+-----------------+-----------+---+ | 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| +-------------------+-----------+------+------+----------+----+-----------+-----------+-------------+-------------+-------------+--------------+---------+--------+--------+----+------------+--------+------------------+--------+--------+----+----+--------+------------+--------+--------------+--------+-------------+----------+---------+----------------+-------------+-------------------------+---------------------+------------------+-------------------------+-----------------------+--------+-----------------+-----------+---+
25/08/29 05:24:44 WARN TaskSetManager: Stage 22 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Registros después de limpieza: 104,583 Categorías de edad creadas
# Configuración de matplotlib y seaborn
plt.style.use('default')
sns.set_style("whitegrid")
sns.set_palette("husl")
# Configuración de tamaños
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['axes.titlesize'] = 16
plt.rcParams['axes.labelsize'] = 12
print("Configuración de visualizaciones aplicada")
Configuración de visualizaciones aplicada
# Convertir a pandas para visualización
edad_data = df_clean.groupBy("GRUPO_EDAD").count().toPandas().sort_values("GRUPO_EDAD")
plt.figure(figsize=(12, 6))
bars = plt.bar(edad_data['GRUPO_EDAD'], edad_data['count'], color='skyblue', alpha=0.7)
plt.title('Distribución de Casos por Grupo de Edad')
plt.xlabel('Grupo de Edad')
plt.ylabel('Número de Casos')
# Añadir valores encima de las barras
for bar in bars:
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width()/2., height,
f'{int(height):,}', ha='center', va='bottom')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
print(f"Total de grupos de edad analizados: {len(edad_data)}")
25/08/29 05:24:58 WARN TaskSetManager: Stage 25 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Total de grupos de edad analizados: 6
# Top 10 entidades con más casos
top_entidades = df_clean.groupBy("ENTIDAD_RES").count().orderBy(desc("count")).limit(10).toPandas()
plt.figure(figsize=(12, 8))
bars = plt.barh(top_entidades['ENTIDAD_RES'], top_entidades['count'], color='lightcoral', alpha=0.7)
plt.title('Top 10 Entidades con Más Casos')
plt.xlabel('Número de Casos')
plt.ylabel('Código de Entidad')
# Añadir valores
for i, bar in enumerate(bars):
width = bar.get_width()
plt.text(width, bar.get_y() + bar.get_height()/2.,
f'{int(width):,}', ha='left', va='center')
plt.tight_layout()
plt.show()
print("Top 10 entidades mostradas")
25/08/29 05:25:03 WARN TaskSetManager: Stage 28 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Top 10 entidades mostradas
# Distribución por tipo de paciente
tipo_data = df_clean.groupBy("TIPO_PACIENTE").count().toPandas()
# Mapear valores
tipo_data['TIPO_DESC'] = tipo_data['TIPO_PACIENTE'].map({1: 'Ambulatorio', 2: 'Hospitalizado'})
plt.figure(figsize=(10, 6))
plt.pie(tipo_data['count'], labels=tipo_data['TIPO_DESC'], autopct='%1.1f%%',
colors=['lightgreen', 'salmon'])
plt.title('Distribución por Tipo de Paciente')
plt.axis('equal')
plt.show()
print("Distribución por tipo de paciente mostrada")
25/08/29 05:25:07 WARN TaskSetManager: Stage 31 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Distribución por tipo de paciente mostrada
# Distribución por sexo
sexo_data = df_clean.groupBy("SEXO").count().toPandas()
sexo_data['SEXO_DESC'] = sexo_data['SEXO'].map({1: 'Mujer', 2: 'Hombre'})
plt.figure(figsize=(8, 6))
plt.pie(sexo_data['count'], labels=sexo_data['SEXO_DESC'], autopct='%1.1f%%',
colors=['pink', 'lightblue'])
plt.title('Distribución de Casos por Sexo')
plt.axis('equal')
plt.show()
print("Distribución por sexo mostrada")
25/08/29 05:25:11 WARN TaskSetManager: Stage 34 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Distribución por sexo mostrada
# Histograma de edades
edad_pandas = df_clean.select("EDAD").toPandas()
plt.figure(figsize=(12, 6))
plt.hist(edad_pandas['EDAD'], bins=50, color='lightblue', alpha=0.7, edgecolor='black')
plt.title('Distribución de Edad de los Pacientes')
plt.xlabel('Edad')
plt.ylabel('Frecuencia')
plt.grid(axis='y', alpha=0.3)
plt.show()
print(f"Estadísticas de edad:")
print(f"Edad promedio: {edad_pandas['EDAD'].mean():.2f}")
print(f"Mediana: {edad_pandas['EDAD'].median():.2f}")
25/08/29 05:25:13 WARN TaskSetManager: Stage 37 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Estadísticas de edad: Edad promedio: 36.61 Mediana: 34.00
# Distribución por sector
sector_data = df_clean.groupBy("SECTOR").count().orderBy(desc("count")).limit(10).toPandas()
plt.figure(figsize=(12, 6))
bars = plt.bar(sector_data['SECTOR'].astype(str), sector_data['count'],
color='gold', alpha=0.7)
plt.title('Casos por Sector (Top 10)')
plt.xlabel('Sector')
plt.ylabel('Número de Casos')
# Añadir valores
for bar in bars:
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width()/2., height,
f'{int(height):,}', ha='center', va='bottom')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
print("Distribución por sector mostrada")
25/08/29 05:25:17 WARN TaskSetManager: Stage 38 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Distribución por sector mostrada
# Análisis temporal por fecha de síntomas
temporal_data = df_clean.groupBy("FECHA_SINTOMAS").count().orderBy("FECHA_SINTOMAS").toPandas()
# Tomar muestra para visualización
if len(temporal_data) > 100:
temporal_sample = temporal_data.sample(n=100).sort_values('FECHA_SINTOMAS')
else:
temporal_sample = temporal_data
plt.figure(figsize=(14, 6))
plt.plot(range(len(temporal_sample)), temporal_sample['count'],
marker='o', linewidth=2, markersize=4, color='red', alpha=0.7)
plt.title('Evolución Temporal de Casos')
plt.xlabel('Tiempo (Muestra)')
plt.ylabel('Número de Casos')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print(f"Análisis temporal con {len(temporal_sample)} puntos de datos")
25/08/29 05:25:22 WARN TaskSetManager: Stage 41 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Análisis temporal con 100 puntos de datos
# Crear tabla cruzada edad-sexo
edad_sexo = df_clean.groupBy("GRUPO_EDAD", "SEXO").count().toPandas()
pivot_table = edad_sexo.pivot(index='GRUPO_EDAD', columns='SEXO', values='count').fillna(0)
plt.figure(figsize=(10, 6))
sns.heatmap(pivot_table, annot=True, fmt='.0f', cmap='YlOrRd',
cbar_kws={'label': 'Número de Casos'})
plt.title('Mapa de Calor: Casos por Edad y Sexo')
plt.ylabel('Grupo de Edad')
plt.xlabel('Sexo (1=Mujer, 2=Hombre)')
plt.tight_layout()
plt.show()
print("Mapa de calor generado")
25/08/29 05:25:27 WARN TaskSetManager: Stage 49 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Mapa de calor generado
# Distribución por origen
origen_data = df_clean.groupBy("ORIGEN").count().orderBy(desc("count")).toPandas()
plt.figure(figsize=(10, 6))
bars = plt.bar(origen_data['ORIGEN'].astype(str), origen_data['count'],
color='purple', alpha=0.7)
plt.title('Distribución de Casos por Origen')
plt.xlabel('Origen')
plt.ylabel('Número de Casos')
# Añadir valores
for bar in bars:
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width()/2., height,
f'{int(height):,}', ha='center', va='bottom')
plt.tight_layout()
plt.show()
print("Distribución por origen mostrada")
25/08/29 05:25:31 WARN TaskSetManager: Stage 52 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Distribución por origen mostrada
print("="*60)
print("ANÁLISIS ESTADÍSTICO AVANZADO")
print("="*60)
# Convertir datos necesarios a pandas
analisis_data = df_clean.select("EDAD", "SEXO", "TIPO_PACIENTE", "ORIGEN").toPandas()
# Correlaciones
correlacion = analisis_data.corr()
print("Matriz de correlación:")
print(correlacion)
# Estadísticas por grupo
print("\nEstadísticas por sexo:")
print(analisis_data.groupby('SEXO')['EDAD'].describe())
print("\nEstadísticas por tipo de paciente:")
print(analisis_data.groupby('TIPO_PACIENTE')['EDAD'].describe())
============================================================ ANÁLISIS ESTADÍSTICO AVANZADO ============================================================
25/08/29 05:25:37 WARN TaskSetManager: Stage 60 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Matriz de correlación:
EDAD SEXO TIPO_PACIENTE ORIGEN
EDAD 1.000000 -0.073727 0.063046 NaN
SEXO -0.073727 1.000000 0.131349 NaN
TIPO_PACIENTE 0.063046 0.131349 1.000000 NaN
ORIGEN NaN NaN NaN NaN
Estadísticas por sexo:
count mean std min 25% 50% 75% max
SEXO
1 58219.0 38.311084 24.815771 0.0 20.0 37.0 57.0 110.0
2 46364.0 34.467151 27.050506 0.0 8.0 31.0 56.0 111.0
Estadísticas por tipo de paciente:
count mean std min 25% 50% 75% max
TIPO_PACIENTE
1 56378.0 35.097041 20.093828 0.0 22.0 34.0 49.0 106.0
2 48205.0 38.372928 31.264116 0.0 5.0 38.0 67.0 111.0
# Preparar datos para clustering
ml_data = df_clean.select("EDAD", "SEXO", "TIPO_PACIENTE", "ORIGEN").toPandas().dropna()
# Normalizar datos
scaler = StandardScaler()
data_scaled = scaler.fit_transform(ml_data)
# Aplicar K-means
kmeans = KMeans(n_clusters=3, random_state=42)
clusters = kmeans.fit_predict(data_scaled)
# Visualizar clusters
plt.figure(figsize=(10, 6))
plt.scatter(ml_data['EDAD'], ml_data['SEXO'], c=clusters, cmap='viridis', alpha=0.6)
plt.title('Clustering de Pacientes')
plt.xlabel('Edad')
plt.ylabel('Sexo')
plt.colorbar()
plt.show()
print(f"Clustering completado con {len(set(clusters))} grupos")
25/08/29 05:25:43 WARN TaskSetManager: Stage 61 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Clustering completado con 3 grupos
# PCA
pca = PCA(n_components=2)
pca_result = pca.fit_transform(data_scaled)
plt.figure(figsize=(10, 6))
plt.scatter(pca_result[:, 0], pca_result[:, 1], alpha=0.6, c=ml_data['EDAD'], cmap='viridis')
plt.title('Análisis de Componentes Principales')
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%} varianza)')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%} varianza)')
plt.colorbar(label='Edad')
plt.show()
print(f"Varianza explicada: PC1={pca.explained_variance_ratio_[0]:.2%}, PC2={pca.explained_variance_ratio_[1]:.2%}")
Varianza explicada: PC1=37.73%, PC2=35.02%
print("\n=== 🔧 PREPARACIÓN PARA SPARK JOBS ===")
# Seleccionar solo columnas esenciales para análisis
columnas_esenciales = [
'FECHA_SINTOMAS', 'FECHA_INGRESO', 'FECHA_DEF',
'SEXO', 'EDAD', 'GRUPO_EDAD', 'TIPO_PACIENTE',
'INTUBADO', 'NEUMONIA', 'DIABETES', 'HIPERTENSION',
'OBESIDAD', 'ASMA', 'EPOC', 'DEFUNCION',
'ENTIDAD_RES', 'CLASIFICACION_FINAL_COVID'
]
# Filtrar columnas que existen en el dataset
columnas_disponibles = [col for col in columnas_esenciales if col in df.columns]
df_optimized = df[columnas_disponibles].copy()
print(f"📊 Dataset optimizado: {len(columnas_disponibles)} columnas seleccionadas")
print("Columnas incluidas en el análisis:")
for i, col in enumerate(columnas_disponibles, 1):
print(f" {i}. {col}")
# =============================================================================
# Manejo apropiado de valores faltantes para Spark
# =============================================================================
print("\n🔧 Aplicando correcciones para compatibilidad con Spark...")
df_for_spark = df_optimized.copy()
for col in df_for_spark.columns:
if df_for_spark[col].dtype == 'object' or df_for_spark[col].dtype.name == 'category':
df_for_spark[col] = df_for_spark[col].fillna('Unknown').astype(str)
elif df_for_spark[col].dtype in ['datetime64[ns]', 'datetime64[ns, UTC]']:
df_for_spark[col] = df_for_spark[col].astype(str)
elif df_for_spark[col].dtype == 'bool':
df_for_spark[col] = df_for_spark[col].astype(str)
else:
df_for_spark[col] = df_for_spark[col].fillna(-999)
print("✅ Correcciones aplicadas exitosamente")
# Convertir a Spark DataFrame
try:
df_spark_optimized = spark.createDataFrame(df_for_spark)
print("✅ DataFrame convertido a Spark exitosamente")
# Mostrar información del DataFrame de Spark
print(f"\n📈 Información Spark DataFrame:")
print(f" Número de particiones: {df_spark_optimized.rdd.getNumPartitions()}")
print(f" Número de registros: {df_spark_optimized.count():,}")
# Mostrar esquema
print(f"\n📋 Esquema del DataFrame Spark:")
df_spark_optimized.printSchema()
except Exception as e:
print(f"❌ Error al convertir a Spark DataFrame: {e}")
print("💡 Usando DataFrame de pandas como alternativa")
df_spark_optimized = None
=== 🔧 PREPARACIÓN PARA SPARK JOBS === 📊 Dataset optimizado: 15 columnas seleccionadas Columnas incluidas en el análisis: 1. FECHA_SINTOMAS 2. FECHA_INGRESO 3. FECHA_DEF 4. SEXO 5. EDAD 6. TIPO_PACIENTE 7. INTUBADO 8. NEUMONIA 9. DIABETES 10. HIPERTENSION 11. OBESIDAD 12. ASMA 13. EPOC 14. ENTIDAD_RES 15. CLASIFICACION_FINAL_COVID 🔧 Aplicando correcciones para compatibilidad con Spark... ✅ Correcciones aplicadas exitosamente ✅ DataFrame convertido a Spark exitosamente 📈 Información Spark DataFrame: Número de particiones: 2
25/08/29 05:26:04 WARN TaskSetManager: Stage 62 contains a task of very large size (1765 KiB). The maximum recommended task size is 1000 KiB. [Stage 62:> (0 + 2) / 2]
Número de registros: 104,583 📋 Esquema del DataFrame Spark: root |-- FECHA_SINTOMAS: string (nullable = true) |-- FECHA_INGRESO: string (nullable = true) |-- FECHA_DEF: string (nullable = true) |-- SEXO: long (nullable = true) |-- EDAD: long (nullable = true) |-- TIPO_PACIENTE: long (nullable = true) |-- INTUBADO: long (nullable = true) |-- NEUMONIA: long (nullable = true) |-- DIABETES: long (nullable = true) |-- HIPERTENSION: long (nullable = true) |-- OBESIDAD: long (nullable = true) |-- ASMA: long (nullable = true) |-- EPOC: long (nullable = true) |-- ENTIDAD_RES: long (nullable = true) |-- CLASIFICACION_FINAL_COVID: long (nullable = true)
print("\n=== 💾 EXPORTACIÓN A PARQUET S3 ===")
# Configuración de S3
S3_BUCKET = "xideralaws-curso-edgargu"
S3_PREFIX = "covid19-data" # Carpeta dentro del bucket (opcional)
try:
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
import io
# Método 1: Exportar con pandas a S3
print(f"\n📤 Método 1: Pandas → S3")
# Convertir DataFrame a Parquet en memoria
parquet_buffer = io.BytesIO()
df_for_spark.to_parquet(parquet_buffer, engine='pyarrow', compression='snappy', index=False)
parquet_buffer.seek(0)
# Subir a S3
s3_key_pandas = f"{S3_PREFIX}/covid19_mexico_pandas.parquet"
s3_client.upload_fileobj(parquet_buffer, S3_BUCKET, s3_key_pandas)
# Obtener información del objeto
response = s3_client.head_object(Bucket=S3_BUCKET, Key=s3_key_pandas)
size_mb = response['ContentLength'] / (1024*1024)
print(f"✅ Dataset exportado con pandas a S3")
print(f"📁 Bucket: s3://{S3_BUCKET}/{s3_key_pandas}")
print(f"📦 Tamaño archivo: {size_mb:.2f} MB")
print(f"📅 Última modificación: {response['LastModified']}")
# Método 2: Usar Spark para exportar directamente a S3
if 'df_spark_optimized' in locals() and df_spark_optimized is not None:
try:
print(f"\n📤 Método 2: Spark → S3")
# Configurar Spark para S3 (si no está configurado)
spark.conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.conf.set("spark.hadoop.fs.s3a.fast.upload", "true")
# Ruta S3 para Spark (usa s3a://)
s3_path_spark = f"s3a://{S3_BUCKET}/{S3_PREFIX}/covid19_mexico_spark.parquet"
# Exportar con Spark
df_spark_optimized.coalesce(1).write.mode("overwrite").parquet(s3_path_spark)
print(f"✅ Dataset también exportado con Spark a S3")
print(f"📁 Ubicación Spark: s3://{S3_BUCKET}/{S3_PREFIX}/covid19_mexico_spark.parquet")
except Exception as spark_error:
print(f"⚠️ Export Spark a S3 falló: {spark_error}")
print("💡 Verifica configuración AWS en Spark (credenciales, jars S3)")
# Método 3: Alternativa usando s3fs (más simple)
print(f"\n📤 Método 3: Usando s3fs")
try:
import s3fs
# Crear filesystem S3
fs = s3fs.S3FileSystem()
# Ruta completa en S3
s3_path_s3fs = f"s3://{S3_BUCKET}/{S3_PREFIX}/covid19_mexico_s3fs.parquet"
# Exportar usando s3fs
with fs.open(s3_path_s3fs, 'wb') as f:
df_for_spark.to_parquet(f, engine='pyarrow', compression='snappy', index=False)
print(f"✅ Dataset exportado usando s3fs")
print(f"📁 Ubicación: {s3_path_s3fs}")
except ImportError:
print("⚠️ s3fs no disponible. Instalar con: pip install s3fs")
except Exception as s3fs_error:
print(f"⚠️ Export s3fs falló: {s3fs_error}")
# Listar archivos en el bucket para verificar
print(f"\n📋 Archivos en s3://{S3_BUCKET}/{S3_PREFIX}/:")
try:
response = s3_client.list_objects_v2(Bucket=S3_BUCKET, Prefix=S3_PREFIX)
if 'Contents' in response:
for obj in response['Contents']:
size_mb = obj['Size'] / (1024*1024)
print(f" 📄 {obj['Key']} ({size_mb:.2f} MB)")
else:
print(" 📂 Carpeta vacía o no existe")
except Exception as list_error:
print(f" ⚠️ Error listando archivos: {list_error}")
print(f"\n🎯 URLs de acceso:")
print(f" Pandas: s3://{S3_BUCKET}/{s3_key_pandas}")
if 'df_spark_optimized' in locals() and df_spark_optimized is not None:
print(f" Spark: s3://{S3_BUCKET}/{S3_PREFIX}/covid19_mexico_spark.parquet")
except ImportError as import_error:
print(f"❌ Librerías faltantes: {import_error}")
print("💡 Instalar con: pip install boto3 pyarrow s3fs")
except NoCredentialsError:
print("❌ Error: Credenciales AWS no encontradas")
print("💡 Configurar con:")
print(" - AWS CLI: aws configure")
print(" - Variables de entorno: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY")
print(" - IAM Role (si estás en EC2)")
except ClientError as client_error:
error_code = client_error.response['Error']['Code']
if error_code == 'NoSuchBucket':
print(f"❌ Error: Bucket '{S3_BUCKET}' no existe")
print("💡 Crear bucket primero o verificar nombre")
elif error_code == 'AccessDenied':
print(f"❌ Error: Sin permisos para acceder a '{S3_BUCKET}'")
print("💡 Verificar políticas IAM para S3")
else:
print(f"❌ Error AWS: {client_error}")
except Exception as general_error:
print(f"❌ Error general al exportar a S3: {general_error}")
print("💡 Verificar:")
print(" - Credenciales AWS configuradas")
print(" - Permisos del bucket")
print(" - Conectividad a internet")
print(" - Instalación: pip install boto3 pyarrow s3fs")
print(f"\n✨ Proceso de exportación completado")
=== 💾 EXPORTACIÓN A PARQUET S3 === 📤 Método 1: Pandas → S3 ✅ Dataset exportado con pandas a S3 📁 Bucket: s3://xideralaws-curso-edgargu/covid19-data/covid19_mexico_pandas.parquet 📦 Tamaño archivo: 0.48 MB 📅 Última modificación: 2025-08-29 05:26:05+00:00 📤 Método 2: Spark → S3
25/08/29 05:26:05 WARN TaskSetManager: Stage 65 contains a task of very large size (3558 KiB). The maximum recommended task size is 1000 KiB.
✅ Dataset también exportado con Spark a S3 📁 Ubicación Spark: s3://xideralaws-curso-edgargu/covid19-data/covid19_mexico_spark.parquet 📤 Método 3: Usando s3fs ✅ Dataset exportado usando s3fs 📁 Ubicación: s3://xideralaws-curso-edgargu/covid19-data/covid19_mexico_s3fs.parquet 📋 Archivos en s3://xideralaws-curso-edgargu/covid19-data/: 📄 covid19-data-json/covid19_mexico_analysis.json (0.00 MB) 📄 covid19-data/covid19_mexico_pandas.parquet (0.48 MB) 📄 covid19-data/covid19_mexico_s3fs.parquet (0.48 MB) 📄 covid19-data/covid19_mexico_spark.parquet/_SUCCESS (0.00 MB) 📄 covid19-data/covid19_mexico_spark.parquet/part-00000-18404016-c329-42a9-9bff-66e918205935-c000.snappy.parquet (0.50 MB) 🎯 URLs de acceso: Pandas: s3://xideralaws-curso-edgargu/covid19-data/covid19_mexico_pandas.parquet Spark: s3://xideralaws-curso-edgargu/covid19-data/covid19_mexico_spark.parquet ✨ Proceso de exportación completado
print("="*60)
print("LIMPIEZA DE RECURSOS")
print("="*60)
# Limpiar cache
df_clean.unpersist()
spark_df.unpersist()
# Cerrar Spark session
spark.stop()
# Limpiar variables de memoria
import gc
del df, df_clean, spark_df, ml_data, data_scaled
gc.collect()
print("✅ Spark session cerrada")
print("✅ Cache liberado")
print("✅ Variables eliminadas")
print("✅ Memoria liberada")
print("\n" + "="*80)
print("🎉 ANÁLISIS COMPLETADO EXITOSAMENTE")
print("="*80)
print("📊 Visualizaciones generadas: 15")
print("🔍 Análisis estadístico: Completo")
print("🤖 Machine Learning: K-means + PCA")
print("📦 Formato Parquet: Generado")
print("☁️ AWS S3: Configurado")
print("="*80)
============================================================ LIMPIEZA DE RECURSOS ============================================================ ✅ Spark session cerrada ✅ Cache liberado ✅ Variables eliminadas ✅ Memoria liberada ================================================================================ 🎉 ANÁLISIS COMPLETADO EXITOSAMENTE ================================================================================ 📊 Visualizaciones generadas: 15 🔍 Análisis estadístico: Completo 🤖 Machine Learning: K-means + PCA 📦 Formato Parquet: Generado ☁️ AWS S3: Configurado ================================================================================
<<===========================================================>>¶
Documentación del Proyecto Covid 19¶
Resumen de la solución implementada en el proyecto¶
The project consists of a full-scale implementation for COVID-19 data analysis in Mexico leveraging advanced Big Data, Machine Learning, and cloud architecture technologies. The original dataset with over 100,000 records is stored in CSV and Parquet formats in an AWS S3 bucket.
Apache Spark is used for efficient ingestion, cleaning, transformation, and statistical analysis of the data, including creation of relevant categories (such as age groups), advanced correlation and clustering analysis (K-means), as well as dimensionality reduction (PCA) to identify significant patient patterns.
Concurrently, an AWS Lambda function was developed to automate processing and statistical calculations, facilitating access and presentation through an interactive dashboard built with Streamlit. This dashboard dynamically reads processed data from S3 and displays key metrics, demographic, temporal analyses.
Mechanisms were implemented to export processed data in Parquet format back to S3, ensuring a continuous and reproducible data flow.
As a future plan, migrating the data to scalable databases such as MySQL or DynamoDB is considered to improve data management, efficient querying, and integration with broader analytical and operational ecosystems.
This solution thus integrates multiple technology layers—from large-scale ingestion and processing with Spark, cloud automation with Lambda, interactive visualization with Streamlit, to planned data storage architectures—creating a robust, agile, and scalable epidemiological analytical system.
Diagrama de Arquitectura del Pipeline¶
IAM con Roles y Políticas¶
GitHub Repositorio¶
Archivos creados en S3 xideralaws-curso-edgargu¶
Archivos .parquet de carpeta "covid19-data/"¶
Spark Jobs¶
Código en Lambda de AWS Console¶
Output¶
Y también código generado para obtener la respuesta en Json y mandar esa respuesta dentro de mi bucket¶
Output:¶
Código Python para Streamlit¶
Dashboard Streamlit¶
Link: http://localhost:8502/¶
Intento de presentación por medio de Visual Studio Code en GitHub para Streamlit¶